【Python】ExcelのTable定義書からRedshift DDLを自動生成

【Python】ExcelのTable定義書からRedshift DDLを自動生成

Clock Icon2023.11.29

はじめに

こんにちは。データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、ExcelのTable定義書からRedshiftのDDLを作成するPython実装を行ってみたいと思います。

前提条件

  • 以下リンクのExcelファイルを対象とします。
  • Redshiftの対応データ型は以下AWS公式記載の19個を対象とします。
  • spectrumやviewのddlには対応していないので、別途回収していただく必要があります。

No.1-19では、それぞれのデータ型のNot NULL制約、デフォルト値の確認用のテストデータ。No.20-38では、PRIMARY KEY制約の確認用のテストデータ。No.39-57では、上記制約を除いた確認用のテストデータとなります。

実装

実装したソースコードはGithubに格納してあります。

30_create_redshift_ddls

フォルダ構成は以下となります。

.
├── README.md
├── input
│   └── 正常系table定義.xlsx
├── lib
│   ├── environments.py
│   ├── excel_processing.py
│   ├── sql_processing.py
│   └── table_info.py
├── main.py
├── output
│   └── dwh.test.sql
└── requirements.txt

3 directories, 9 files

まず、必要なライブラリをrequrements.txtからinstallします。

pip install -r requirements.txt

main.py

main.pyでは、他のライブラリを呼び出すmain処理を実装しています。 処理の流れとしては、大きく以下になります。

  1. 環境変数読み込み
  2. Excelファイル読み込み
  3. フォルダ配下のファイル数分繰り返し、pandasを用いてデータを取得。
  4. table情報からDDLを整形
  5. sqlファイルとして出力
import glob
import warnings
import traceback
from lib.table_info import TableInfo
from lib.environments import get_env
from lib.excel_processing import extract_schema_table_name_from_excel, extract_table_data_from_excel
from lib.sql_processing import make_column_definition

# Excelファイルにデータ検証(Data Validation)の拡張が含まれているけど、
# その拡張は openpyxl ライブラリでサポートされていないという意味です。
# そのため、読み込むExcelファイルがこの拡張を必要としない場合には問題ありません。
warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed")


def write_to_file(s_name: str, t_name: str, ddl: str) -> None:
    """ "SQL文をファイルに書き込み"""
    with open(f"./output/{s_name}.{t_name}.sql", "w", encoding="utf-8") as file:
        file.write(ddl)


def main():
    try:
        env = get_env()
        files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file]
        if not files:
            raise ValueError("file did not exist.")
        print("files:", files)
        for file in files:
            schema_name, table_name = extract_schema_table_name_from_excel(file, env)
            extract_table_data = extract_table_data_from_excel(file, env)
            column_lengths = extract_table_data[env["COLUMN_NAME"]].apply(lambda x: len(str(x)))
            max_column_length = max(column_lengths)
            ddl = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}(\n"
            table_info = TableInfo(ddl, max_column_length)
            for _, row in extract_table_data.iterrows():
                table_info.column_name = row[env["COLUMN_NAME"]]
                table_info.data_type = row[env["DATA_TYPE"]]
                table_info.digits = row[env["DIGITS"]]
                table_info.decimal_part = row[env["DECIMAL_PART"]]
                table_info.primary_key = row[env["PRIMARY_KEY"]]
                table_info.is_not_null = row[env["NOT_NULL"]]
                table_info.default_value = row[env["DEFAULT_VALUE"]]
                # 全て含む場合
                if {
                    table_info.column_name,
                    table_info.decimal_part,
                    table_info.data_type,
                    table_info.digits,
                    table_info.primary_key,
                    table_info.default_value,
                } == {"-"}:
                    continue
                elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","):
                    if table_info.primary_key != "-":
                        table_info.primary_key_list = table_info.column_name
                    make_column_definition(table_info)
                else:
                    print("error record:", vars(table_info))
                    raise ValueError("There is no data type. Tracking your self or  ask the administorator")
            if table_info.primary_key_list != []:
                table_info.ddl += "    PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n"
            table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;"
            write_to_file(schema_name, table_name, table_info.ddl)
        print("successfull")
    except Exception:
        print("error file:", file)
        traceback.print_exc()


if __name__ == "__main__":
    main()


要点だけ確認していきます。 openpyxl ライブラリは、Excelファイルの読み込み時に、「Data Validation extension is not supported and will be removed」という警告を出します。 これは、Excelファイルにデータ検証(Data Validation)の拡張が含まれていますが、その拡張はopenpyxl ライブラリでサポートされていないという意味になります。読み込むExcelファイルがこの拡張を必要としない場合には問題ないため、警告を無視するように実装しています。

warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed")

globを用いて、input配下の拡張子xlsxのファイルを抽出します。抽出時に~$というExcelファイルをOpen時に作成されるtmpファイルは除くようにしています。

files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file]

PandasのDataFrameの特定の列(env["COLUMN_NAME"] という環境変数で指定されている列)を対象に、その各行の要素(値)の文字列長を計算し、その結果を新たなcolumn_lengthsとして返しています。 具体的には、apply関数を使って lambda x: len(str(x)) という無名関数(lambda関数)を各行に適用しています。このlambda関数はその引数 x を文字列に変換 str(x) し、その長さ len() を返すものです。

column_lengths = table[env["COLUMN_NAME"]].apply(lambda x: len(str(x)))

全てのtable_infoが-の場合は、空レコードなのでcontinueし、環境変数で定義した、Redshift データ型にMatchする場合にddlを作成します。Matchしない行がある場合は、例外処理をします。

# 全て含む場合
if {
    table_info.column_name,
    table_info.decimal_part,
    table_info.data_type,
    table_info.digits,
    table_info.primary_key,
    table_info.default_value,
} == {"-"}:
    continue
elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","):
    if table_info.primary_key != "-":
        table_info.primary_key_list = table_info.column_name
    make_column_definition(table_info)
else:
    print("error record:", vars(table_info))
    raise ValueError("There is no data type. Tracking your self or  ask the administorator")

主キーが存在する場合は、主キーの記載を行います。DDLの最後にDISTSTYLE AUTOSORTKEY AUTOを付与していますが、必要に応じて適切な設定を選択することが重要となります。データ分散スタイル(DISTSTYLE)はテーブルのデータがRedshiftクラスタ内のノードにどのように分散されるかを決定します。AUTOを指定すると、Redshiftはテーブルのサイズ(行数やデータ量)を見て、ALL、KEY、EVENの最適なものに自動的に切り替えます。ソートキー(SORTKEY)AUTOを指定すると、Redshiftは実行されるクエリのパターンを見て、一連のソートキーを自動的に調整してテーブルをソートします。そのため、データが増加してもクエリパフォーマンスを保てます。

if table_info.primary_key_list != []:
    table_info.ddl += "    PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n"
table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;"

environments.py

下記ファイルでは、table定義書の環境変数をそれぞれ記載していきます。また、REDSHIFT_DATA_TYPESにて対応のRedshiftデータ型を定義しています。

import os


def get_env():
    # Excelシート名
    os.environ["SHEET_NAME"] = os.environ.get("SHEET_NAME", "table定義")
    # カラム名の列名
    os.environ["COLUMN_NAME"] = os.environ.get("COLUMN_NAME", "カラム名(物理)")
    # データ型の列名
    os.environ["DATA_TYPE"] = os.environ.get("DATA_TYPE", "データ型")
    # 桁数の列名
    os.environ["DIGITS"] = os.environ.get("DIGITS", "桁数")
    # 小数点以下桁数の列名
    os.environ["DECIMAL_PART"] = os.environ.get("DECIMAL_PART", "小数点以下桁数")
    # 主キーの列名
    os.environ["PRIMARY_KEY"] = os.environ.get("PRIMARY_KEY", "PRIMARY KEY")
    # Not NULL制約の列名
    os.environ["NOT_NULL"] = os.environ.get("NOT_NULL", "NOT NULL")
    # デフォルト値の列名
    os.environ["DEFAULT_VALUE"] = os.environ.get("DEFAULT_VALUE", "デフォルト値")
    # table nameが格納されている列番号
    os.environ["TABLE_NAME_LOCATION"] = os.environ.get("TABLE_NAME_LOCATION", "1")
    # schema nameが格納されている列番号
    os.environ["SCHEMA_NAME_LOCATION"] = os.environ.get("SCHEMA_NAME_LOCATION", "5")

    os.environ["REDSHIFT_DATA_TYPES"] = ",".join(
        [
            "SMALLINT",
            "INTEGER",
            "BIGINT",
            "DECIMAL",
            "REAL",
            "DOUBLE PRECISION",
            "BOOLEAN",
            "CHAR",
            "VARCHAR",
            "DATE",
            "TIMESTAMP",
            "TIMESTAMPTZ",
            "GEOMETRY",
            "GEOGRAPHY",
            "HLLSKETCH",
            "SUPER",
            "TIME",
            "TIMETZ",
            "VARBYTE",
        ]
    )
    return os.environ

excel_processing.py

下記ファイルでは、Excelからデータを抽出、加工する処理を実装しています。

import pandas as pd


def extract_schema_table_name_from_excel(file: str, env):
    """Excelファイルからtable_name,schema_nameを抽出"""
    header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0)
    schema_name = header_df.columns[int(env["SCHEMA_NAME_LOCATION"])]
    table_name = header_df.columns[int(env["TABLE_NAME_LOCATION"])]
    return schema_name, table_name


def extract_table_data_from_excel(file: str, env) -> pd.DataFrame:
    """Excelファイルからデータフレームを抽出"""
    extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str)
    col_map = {
        "column_name": env["COLUMN_NAME"],
        "data_type": env["DATA_TYPE"],
        "digits": env["DIGITS"],
        "decimal_part": env["DECIMAL_PART"],
        "primary_key": env["PRIMARY_KEY"],
        "not_null": env["NOT_NULL"],
        "default_value": env["DEFAULT_VALUE"],
    }
    extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]]
    return clean_table(extracted_data)


def clean_table(
    df: pd.DataFrame,
) -> pd.DataFrame:
    """データフレームのクリーニング"""
    # 文字列型のデータの欠損値を"-"に変換します
    return df.fillna("-")

nrows=0と指定することで先頭行のみを抽出しています。

header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0)

header=2と指定し、2行目をheaderとし、dtype=strとすることで全てstringで抽出するようにしています。

extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str)

mapに含まれているカラムのみをdfで使用するようにします。

col_map = {
        "column_name": env["COLUMN_NAME"],
        "data_type": env["DATA_TYPE"],
        "digits": env["DIGITS"],
        "decimal_part": env["DECIMAL_PART"],
        "primary_key": env["PRIMARY_KEY"],
        "not_null": env["NOT_NULL"],
        "default_value": env["DEFAULT_VALUE"],
    }
    extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]]

sql_processing.py

下記ファイルで、ddl定義を行っています。{file_info.column_name:<{file_info.max_column_length}}と最初に定義することで、それぞれのカラムごとにフォーマットのばらつきが出ないように調整しています。file_info.digits(文字桁数)file_info.decimal_part(小数点以下桁数)file_info.is_not_null(Not Null), file_info.default_value(デフォルト値)にそれぞれ値がある場合は、追記するような条件分岐となっています。

def make_column_definition(file_info):
    sql_line = f"    {file_info.column_name:<{file_info.max_column_length}} {file_info.data_type}"

    # Add digits and decimal part if needed
    if file_info.digits != "-":
        if file_info.decimal_part != "-":
            sql_line += f"({int(file_info.digits)},{file_info.decimal_part})"
        else:
            sql_line += f"({int(file_info.digits)})"

    # Add NOT NULL if needed
    if file_info.is_not_null != "-":
        sql_line += " NOT NULL"
    # Add default value if needed
    if file_info.default_value != "-":
        sql_line += f" DEFAULT '{file_info.default_value}'"

    file_info.ddl += sql_line + ",\n"

table_info.py

下記ファイルでは、データのset,get用のTableInfoClassを定義しています。

class TableInfo:
    def __init__(self, ddl, max_column_length):
        self.__ddl = ddl
        self.__data_type = ""
        self.__digits = ""
        self.__is_not_null = ""
        self.__column_name = ""
        self.__max_column_length = max_column_length
        self.__decimal_part = 0
        self.__primary_key = ""
        self.__default_value = ""

        self.__primary_key_list = []

    # gettter method
    @property
    def ddl(self):
        return self.__ddl

    @property
    def data_type(self):
        return self.__data_type

    @property
    def digits(self):
        return self.__digits

    @property
    def is_not_null(self):
        return self.__is_not_null

    @property
    def column_name(self):
        return self.__column_name

    @property
    def max_column_length(self):
        return self.__max_column_length

    @property
    def decimal_part(self):
        return self.__decimal_part

    @property
    def primary_key(self):
        return self.__primary_key

    @property
    def primary_key_list(self):
        return self.__primary_key_list

    @property
    def default_value(self):
        return self.__default_value

    # setter method
    @ddl.setter
    def ddl(self, ddl):
        self.__ddl = ddl

    @data_type.setter
    def data_type(self, data_type):
        self.__data_type = data_type

    @digits.setter
    def digits(self, digits):
        self.__digits = digits

    @is_not_null.setter
    def is_not_null(self, is_not_null):
        self.__is_not_null = is_not_null

    @column_name.setter
    def column_name(self, column_name):
        self.__column_name = column_name

    @max_column_length.setter
    def max_column_length(self, max_column_length):
        self.__max_column_length = max_column_length

    @decimal_part.setter
    def decimal_part(self, decimal_part):
        self.__decimal_part = decimal_part

    @primary_key.setter
    def primary_key(self, primary_key):
        self.__primary_key = primary_key

    @primary_key_list.setter
    def primary_key_list(self, primary_key_list):
        self.__primary_key_list.append(primary_key_list)

    @default_value.setter
    def default_value(self, default_value):
        self.__default_value = default_value

実行結果

それではmain.pyを実行してみます。

(ddl_env) kasama.yoshiki@ 30_create_redshift_ddls % python main.py
files: ['./input/正常系table定義.xlsx']
successfull
(ddl_env) kasama.yoshiki@30_create_redshift_ddls % 

outputフォルダに対象のddlが作成されました。以下の観点でも問題ないことを確認できました。

  • col_column_name_1: Not NULL,デフォルト値桁数``小数点以下桁数
  • col_column_name_2: primary key
  • col_column_name_3: 制約なし
CREATE TABLE IF NOT EXISTS dwh.test(
    col_smallint_1    SMALLINT NOT NULL DEFAULT '0',
    col_integer_1     INTEGER NOT NULL DEFAULT '0',
    col_bigint_1      BIGINT NOT NULL DEFAULT '0',
    col_decimal_1     DECIMAL(5,2) NOT NULL DEFAULT '0.0',
    col_real_1        REAL NOT NULL DEFAULT '0.0',
    col_double_1      DOUBLE PRECISION NOT NULL DEFAULT '0.0',
    col_boolean_1     BOOLEAN NOT NULL DEFAULT 'True',
    col_char_1        CHAR(10) NOT NULL,
    col_varchar_1     VARCHAR(255) NOT NULL,
    col_date_1        DATE NOT NULL DEFAULT '2020-01-01',
    col_timestamp_1   TIMESTAMP NOT NULL DEFAULT '2000-01-01 00:00:00',
    col_timestamptz_1 TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01 00:00:00+09',
    col_geometry_1    GEOMETRY NOT NULL,
    col_geography_1   GEOGRAPHY NOT NULL,
    col_hllsketch_1   HLLSKETCH NOT NULL,
    col_super_1       SUPER NOT NULL DEFAULT '{"key": "value"}',
    col_time_1        TIME NOT NULL DEFAULT '00:00:00',
    col_timetz_1      TIMETZ NOT NULL DEFAULT '00:00:00+09',
    col_varbyte_1     VARBYTE NOT NULL,
    col_smallint_2    SMALLINT,
    col_integer_2     INTEGER,
    col_bigint_2      BIGINT,
    col_decimal_2     DECIMAL(5,2),
    col_real_2        REAL,
    col_double_2      DOUBLE PRECISION,
    col_boolean_2     BOOLEAN,
    col_char_2        CHAR(10),
    col_varchar_2     VARCHAR(255),
    col_date_2        DATE,
    col_timestamp_2   TIMESTAMP,
    col_timestamptz_2 TIMESTAMPTZ,
    col_geometry_2    GEOMETRY,
    col_geography_2   GEOGRAPHY,
    col_hllsketch_2   HLLSKETCH,
    col_super_2       SUPER,
    col_time_2        TIME,
    col_timetz_2      TIMETZ,
    col_varbyte_2     VARBYTE,
    col_smallint_3    SMALLINT,
    col_integer_3     INTEGER,
    col_bigint_3      BIGINT,
    col_decimal_3     DECIMAL(5,2),
    col_real_3        REAL,
    col_double_3      DOUBLE PRECISION,
    col_boolean_3     BOOLEAN,
    col_char_3        CHAR(10),
    col_varchar_3     VARCHAR(255),
    col_date_3        DATE,
    col_timestamp_3   TIMESTAMP,
    col_timestamptz_3 TIMESTAMPTZ,
    col_geometry_3    GEOMETRY,
    col_geography_3   GEOGRAPHY,
    col_hllsketch_3   HLLSKETCH,
    col_super_3       SUPER,
    col_time_3        TIME,
    col_timetz_3      TIMETZ,
    col_varbyte_3     VARBYTE,
    PRIMARY KEY (col_smallint_2, col_integer_2, col_bigint_2, col_decimal_2, col_real_2, col_double_2, col_boolean_2, col_char_2, col_varchar_2, col_date_2, col_timestamp_2, col_timestamptz_2, col_time_2, col_timetz_2, col_varbyte_2)
)
DISTSTYLE AUTO
SORTKEY AUTO;

このddlをRedshift Serverlessで実行してみたいと思います。

table定義書で指定したdwhスキーマは今回は作成していないため、schema指定を外して実行しましたが、問題なく成功しました。

table definitionからも正常に作成されていることを確認できました。

最後に

table定義書のExcelファイルはあくまでサンプルですので、それぞれのプロジェクトに合わせた内容にpandas実装、environments.pyなどを修正し、役立てていただければと思います。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.